package com.yiwyn;


import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.yiwyn.sink.CustomSink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDCApplication {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        MySqlSource<String> source = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .password("root")
                .username("root")
                .serverTimeZone("UTC")
                .databaseList("cdc-demo")
                .tableList("cdc-demo.user").startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).build();


        env.fromSource(source, WatermarkStrategy.noWatermarks(), "mysql-source").setParallelism(1).addSink(new CustomSink());

        env.execute("mysql-source");


    }

}
